202602021508 python如何在将contextVar带到子线程中
202602021508 python如何在将contextVar带到子线程中
import time
from concurrent.futures import ThreadPoolExecutor
from contextvars import copy_context, ContextVar
trace_id = ContextVar("trace_id")
def work(x):
time.sleep(x)
print("trace_id =", trace_id.get(None), "x =", x)
def submit_with_context(pool, fn, *args, **kwargs):
ctx = copy_context()
return pool.submit(ctx.run, fn, *args, **kwargs)
trace_id.set("REQ-123")
with ThreadPoolExecutor(max_workers=2) as pool:
f1 = submit_with_context(pool, work, 1)
f2 = submit_with_context(pool, work, 2)
f3 = pool.submit(work, 3)
f1.result()
f2.result()
f3.result()
会将ContextVar带到子线程中
输出:
trace_id = REQ-123 x = 1
trace_id = REQ-123 x = 2
trace_id = None x = 3
特别适合在做trace的时候,将 contextVar 传到子线程中
在代码库中包装一层:
"""上下文感知的并发工具,解决 ThreadPoolExecutor 中 contextvars 不自动传播的问题。"""
import contextvars
from concurrent.futures import ThreadPoolExecutor, Future
from typing import Callable
class ContextAwareThreadPoolExecutor(ThreadPoolExecutor):
"""自动传播 contextvars 上下文到工作线程的 ThreadPoolExecutor, 保留主进程logid,trace等信息。
用法与标准 ThreadPoolExecutor 完全一致,只需替换 import 即可。
"""
def submit(self, fn: Callable, /, *args, **kwargs) -> Future:
"""提交任务时自动捕获当前上下文,使工作线程继承调用方的 contextvars。"""
ctx = contextvars.copy_context()
return super().submit(ctx.run, fn, *args, **kwargs)